Query-aware statistics requests via ScanArgs / ScanResult (RFC for #21624)#21996
Query-aware statistics requests via ScanArgs / ScanResult (RFC for #21624)#21996adriangb wants to merge 2 commits intoapache:mainfrom
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
ea101ed to
d56a20f
Compare
| // Answer any requested stats from the table-level metadata we | ||
| // already touched. Anything not derivable from the dense | ||
| // `Statistics` we computed comes back as `Absent`. Skipped | ||
| // entirely when the caller didn't ask. We also skip when | ||
| // `collect_statistics=false` — the contract is "answer what's | ||
| // free", and computing stats here just to populate this map | ||
| // would violate that. |
There was a problem hiding this comment.
Long term I think it'd be good to get rid of the dense statistics (or as a first step only create them ephemerally when we read them from the footer) but that kind of has to happen after there are no more consumers. It seemed easier to implement the sparse stats deriving from the dense stats for now.
| /// manifests, Hive Metastore, custom catalogs) can populate this | ||
| /// directly without rebuilding a full dense `Statistics`. | ||
| pub satisfied_stats: | ||
| Option<Arc<std::collections::HashMap<StatisticsRequest, StatisticsValue>>>, |
There was a problem hiding this comment.
A type alias for this type might be nice.
| let file_stats = partitioned_file.statistics.as_ref()?; | ||
| let file_stats_pruning = | ||
| PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema)); | ||
| let file_stats_pruning: Box<dyn PruningStatistics + Send + Sync> = |
There was a problem hiding this comment.
PruningPredicate also uses this trait, so it might not be that hard to port there too!
d56a20f to
fce861f
Compare
68cab2b to
8b78b77
Compare
Adds an opt-in handshake that lets callers ask a `TableProvider` for specific stats by name and receive only what the provider can answer cheaply, instead of the all-or-nothing dense `Statistics` we have today. ## What's new * `datafusion-common::stats::StatisticsRequest` — enum of stat kinds that mirror `Statistics` / `ColumnStatistics` (Min, Max, NullCount, DistinctCount, Sum, ByteSize, RowCount, TotalByteSize). `Hash + Eq` so it can key a `HashMap`. * `datafusion-common::stats::StatisticsValue` — `Scalar(Precision<...>) | Distribution(Arc<dyn Any>) | Sketch(Arc<dyn Any>) | Absent`. Whether a value is exact or estimated travels in the `Precision` wrapper, not the variant. * `ScanArgs::with_statistics_requests` / `statistics_requests()` — the caller's question. * `ScanResult::with_statistics` / `statistics()` / `into_parts()` — the provider's answer, paired 1:1 with the requests slice. * `PartitionedFile::satisfied_stats` — sparse, `Arc<HashMap<StatisticsRequest, StatisticsValue>>` for per-file answers. Memory scales with what was asked, not with table width. Providers that store stats out-of-band (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) can populate this directly without rebuilding a full dense `Statistics`. * `FilePruner` learns to consume the sparse map. Internally, `file_stats_pruning` is now `Box<dyn PruningStatistics + Send + Sync>` so we can dispatch between the existing `PrunableStatistics` (dense) and a new `SparseFilePruningStats` adapter (sparse). The sparse adapter looks up each `StatisticsRequest` directly in the map and materializes single-row arrays only for the columns the pruning predicate touches — no densify-then-throw-away. * `ListingTable::scan_with_args` populates `ScanResult.statistics` from the merged dense `Statistics` it already computed when `args.statistics_requests()` is set and `collect_statistics=true`. When `collect_statistics=false` it returns `Absent` for everything (the contract is "answer what's free"). `DistinctCount`/`Sum`/ `ByteSize` are likewise `Absent` for parquet — those aren't in thrift footers; layered helpers (or richer providers) can fill the gaps. ## Backwards compat All additions are opt-in: * `ScanArgs` / `ScanResult` gain new fields with `Default`-friendly initializers; existing callers that don't use the new builders see no change. * `FilePruner`'s field-type change is internal (private field). * The only minor source-level break is a new pub field on `PartitionedFile` (`satisfied_stats`). Callers using `PartitionedFile::new` / `From<ObjectMeta>` / the existing builders are unaffected. Direct struct literals — uncommon, none in-tree — need to add `satisfied_stats: None` (or use the new `with_satisfied_stats` builder). ## Tests * `datafusion-common::stats::tests::statistics_request_is_hashable_keyable` — round-trip a `StatisticsRequest` through a `HashMap`. * `datafusion-pruning::file_pruner::tests` — three tests demonstrating end-to-end pruning against a sparse-only `PartitionedFile` (`x > 100` prunes a `[10, 20]` file, `x > 15` doesn't, no stats at all → no pruner). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
8b78b77 to
7c25d5a
Compare
| // Map column-name -> originating TableReference (last writer wins | ||
| // when names collide; we accept that imprecision as a POC). |
There was a problem hiding this comment.
This needs to be sorted out
cf15e66 to
9fdced1
Compare
Stacked on top of the API-only commit. Adds the missing piece: a
small optimizer rule that walks the optimized logical plan and
populates `TableScan.statistics_requests` based on the surrounding
plan shape, plus a physical-planner hook that threads those into
`ScanArgs::with_statistics_requests`.
* `TableScan` gains `statistics_requests: Vec<StatisticsRequest>`
(default empty) and a `with_statistics_requests` builder.
* New `RequestStatistics` `OptimizerRule` (registered last in the
default pipeline). Walks the plan once, derives:
Sort → Min / Max / NullCount on each sort key
Filter → Min / Max / NullCount / DistinctCount on referenced cols
Join → DistinctCount / NullCount on join keys (both sides)
always → RowCount per scan
Stable, deterministic ordering. Idempotent. Never reshapes the
plan — only annotates `TableScan` nodes.
* `DefaultPhysicalPlanner` reads `scan.statistics_requests` and
threads them into `ScanArgs::with_statistics_requests` when calling
`provider.scan_with_args`.
* `ScanArgs::statistics_requests` field switched from
`Option<&[StatisticsRequest]>` to `&[StatisticsRequest]` (empty
slice = no requests; collapses two ways of saying the same thing).
* `request_statistics::tests` (3 unit tests) — confirm RowCount per
scan, filter-column requests, join-key DistinctCount.
* `user_defined::statistics_requests` (2 e2e tests) — register a
`RecordingTable` provider, run SQL through the full pipeline, assert
the requests that reached `scan_with_args` match what the plan
shape implies.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
9fdced1 to
6514f89
Compare
|
Really interesting work @adriangb, addressing an important gap, this has potential to unlock a lot of further development on the statistics front. I've been working on several related pieces (ExpressionAnalyzer #21122, StatisticsRegistry #21483, StatisticsContext #21815) and wanted to offer some perspective on how they might connect. On the ExpressionAnalyzer (#21122) relationshipYour framing of "orthogonal with overlap" is very convincing to me, but I wonder if the split might be even cleaner than presented. Keeping Sketches, advanced stats, and the broader statistics infrastructureYou mention "room to grow toward sketches, histograms, and selectivity stats later", which reminded me of the If Happy to discuss in more detail if that would be useful! |
|
Thanks @asolimando ! Thinking about it a bit more the axis that makes the split hard in DataFusion is that DataFusion often is federated to or federeates to other systems. E.g. if the TableProvider is a Postgres table, it's completely reasonable that there is overlap in functionality (Postgres also has a planner, stats system, etc.). But DataFusion also talks to dumb Parquet files...
I think the key / nice thing would be to allow the implementer to decide what it can and can't provide (which I've kind of attempted in this PR). The next step would be to let it provide statistics for some parts of expressions and not others (i.e. split up the expression tree). But maybe it's okay to sidestep this for now and say "this change is compatible with these future directions but more immediately it solves some memory usage and usability issues". |
Which issue does this PR close?
datafusion.execution.collect_statisticson wide tables #21624 (datafusion.execution.collect_statisticson wide tables).Rationale for this change
Today, statistics flow through DataFusion as an all-or-nothing dense
Statisticsstruct:collect_statistics=truereads parquet thrift footers for every column of every file, allocates aVec<ColumnStatistics>of lengthnum_columnsper file, and stores it whether the query references those columns or not. On wide tables that's a lot of memory and IO that the query may not need.This is even worse for 3rd party
TableProviderimplementations that may store statistics in an external catalog or Parquet files in object store (think Delta/Iceberg, a Postgres backed table, etc). These implementations could efficiently pull statistics for 1-2 columns but are forced to pull all statistics since they don't know which ones DataFusion wants leading to a lot of wasted work.This PR proposes a small handshake on the existing
scan_with_argsAPI that lets a caller ask aTableProviderfor specific stats by name, and lets the provider answer only what it can deliver cheaply. The shape is intentionally minimal — enough to unblock memory and third-party wins on its own, with room to grow toward sketches, histograms, and selectivity stats later.What changes are included in this PR?
The change is split into two stacked commits.
My goal was to implement the APIs, statistics collection and
FilePruneras the first consumer. Porting all other consumers is a larger piece of work that I think should be done across multiple PRs.Commit 1:
catalog: query-aware statistics requests via ScanArgs / ScanResultThe API surface itself.
New types in
datafusion-expr-common::statisticsThe variants of
StatisticsRequestmirror the fields ofStatistics/
ColumnStatistics, so a provider that already populates one cananswer the other trivially. Whether a value is exact or estimated
travels in the
Precisionwrapper, not in the request kind itself.ScanArgs/ScanResultextensionThe contract: "answer what's free, leave the rest as
Absent."Per-file sparse stats
Memory scales with what was asked for, not with table width.
FilePrunerconsumes the sparse mapFilePrunerlearns to dispatch between the existingPrunableStatistics(dense) and a new
SparseFilePruningStatsadapter that does directrequest-keyed lookups against the sparse map. No densify-then-throw-away.
ListingTableanswers from footer metadataWhen a caller passes
with_statistics_requests(...),scan_with_argspopulates
ScanResult.statisticsfrom the merged denseStatisticsitalready touched. Min/Max/NullCount/RowCount/TotalByteSize come back as
the
Precisionvalue the format produced. DistinctCount/Sum/ByteSizecome back as
Absentfor parquet — those aren't in thrift footers.Commit 2:
optimizer: derive StatisticsRequests from logical planThe producer side: now the planner actually asks.
TableScangainsstatistics_requests: Vec<StatisticsRequest>and awith_statistics_requestsbuilder.New
RequestStatisticsOptimizerRuleruns last in the defaultpipeline. Walks the optimized plan once, derives:
Idempotent; never reshapes the plan, only annotates
TableScans.DefaultPhysicalPlannerreadsscan.statistics_requestsand threadsthem into
ScanArgs::with_statistics_requestswhen callingprovider.scan_with_args.End to end: SQL → optimizer →
TableScan.statistics_requests→physical planner →
provider.scan_with_args(ScanArgs{statistics_requests, ..})→ provider returns whatever it can on
ScanResult.statistics→caller (today: nothing built-in; tomorrow: a layered stats helper or
optimizer rule) consumes.
Future work
Advanced stats not available from Parquet footers
In designing this change I wanted to keep in mind some of the other goals discussed in #21624.
In particular, I think we should develop a story for being able to collect more advanced planning statistics ad-hoc when they are not available.
I was inspired by Floe SQL's CMU talk.
The idea would be:
TableProviderimplementations (this PR) and they answer what they can. E.g. if your metadata store keeps track of most common values for a column, congrats! Populate it from there. But if you have a bunch of random Parquet files just leave it blank.a) File -> Row group -> Row Chunk sampling. Basically a semi-efficient way to say "get me a random sample of 1% of the data". I started POC work on this in feat: TABLESAMPLE SYSTEM end-to-end + row-group / row sampling on ParquetSource #22000 which also stands on it's own.
b) Time and row bounded scanning. This sampled data is a stream that flows into aggregators that compute stats. We put a time (say 150ms) and row number (say 100k) limit on this stream. This guarantees that our worst case cost is constant. If we bail out early we can emit our best estimate up to this point or leave the stats unpopulated.
We can also play all sorts of games with timing, e.g. we can start the query and the stats collection and race them. If the query finished before the stats collection (possible if e.g. there's a single file and opening the file is the dominating cost) then just cancel stats collection. If stats collection finishes first we can re-optimize the plan and if the new plan is "better" enough that we think it's worth the price we cancel and re-run the query. I imagine this might be a good config knob.
I also imagine there might be some hooks in the implementation of how to collect those stats. A default one we bundle might do the sampling from parquet files, but I can imagine users may want to hook in some catalog of stats they have if they deem it too expensive to run as part of
TableProvider::scan_with_args, e.g. if they store a pre-sampled set of rows or just advanced statistics like sketches (that can't be represented by Parquet's simplistic stats model) as an embedded index.Expression based statistics
Discussed in #20871. We represent struct field access in datafusion as an expression
get_field(col, 'field'). It is not a column. Our existing statistics system cannot represent statistics for a struct field (only the top level struct, which is meaningless most of the time) even though Parquet carries stats for leaf columns of nested types.This proposal would make it much easier to implement this, we would need to expand
StatisticsRequestwithMin(Expr)/Max(Expr), etc. We could foldColumninto there or keep it as a special case. This mirrors what engines like Postgres can do (you can tell it to collect and store stats on expressions). Then aPartitionedFilecan havefile.stats.get(Min(Expr(get_field(col, 'field'))) ->StatisticsResult::Scalar(...)` (pseudo code to avoid UDF invocation ceremony).Relationship to other PRs
There is a lot of work being done around statistics, I will mention just the ones that stand out in my mind.
This PR deals with 2 things:
It does not deal with:
#19609
This PR deals with statistics propagation and statistics representation.
I don't think there is much conflict in propagation (the current PR does not deal with that) but the representation piece is interesting. I can see a world where the
HashMap<StatisticsRequest, StatisticsResponse>is instead aRecordBatch+ a wrapper that offers an API likestats.get([Min("column"), Max("other column")])but knows how to look that up in theRecordBatchand produce a newRecordBatchas a result that can be used for vectorized operations on statics.Right now I am more concerned with the IO / memory cost of stats and having APIs to be able to understand what stats are needed and request and represent advanced stats more so than I am with vectorized execution on stats. In my timings from #21968 (comment) stats computation (e.g.
PruningPredicate) is not a major issue, it's stats collection that costs us a lot. I've seen similar things in our system where we store stats in Parquet files. I also have some trouble grasping how we are going to fit and what the advantages are of storing things like sketches or histograms in arrow arrays.#21122
This work also deals with statistics propagation. It is orthogonal, but there is some overlap worth discussing. In particular in the section above about struct field stats I proposed we allow statistics requests to contain expressions. It then becomes unclear if, in the case of
JOIN t2 ON coalesce(t1.col, '') = t2.colif we should use the APIs in this PR to requestStatisticsRequest::Min(Expr(coalesce(t1.col, '')))or if we should request stats ont1.coland use something like #21122 to propagate them.I think this won't be that hard to resolve: the
get_fieldfunctions are already special viadatafusion/datafusion/expr/src/udf.rs
Lines 972 to 984 in 1416ed4
Maybe we can use that existing API? Or we can come up with something new. But ultimately it seems resolvable to split expressions into ones we ask the data source for vs. derive.
Are there any user-facing changes?
API additions only, all opt-in:
ScanArgs/ScanResultgain new fields withDefault-friendly initializers; existing callers that don't use the new builders see no change.FilePruner's field-type change is a private internal field.The only source-level break:
PartitionedFilegains a newpub satisfied_stats: Option<...>field. Callers usingPartitionedFile::new/From<ObjectMeta>/ the existing builders are unaffected. Direct struct literals (uncommon, none in-tree) need to addsatisfied_stats: Noneor migrate to the newwith_satisfied_statsbuilder.